-
Notifications
You must be signed in to change notification settings - Fork 476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Relay WebSocket library refactor #1930
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall architecture looks good, but I have some implementation recommendations. I didn't fully go through this PR yet as a heads up, so there might be other stuff I'll recommend in later reviews.
if cc.compression { | ||
data = compressed.Bytes() | ||
} else { | ||
data = notCompressed.Bytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say we shouldn't serialize both versions if we don't need to, but we expect compression to the common case, and the notCompressed serialization is presumably required to generate the compressed output, so this might be fine as-is. Still, if it isn't too difficult, it might be worth only serializing the one we need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the serializeMessage
function is a little confusing. It was already there so I haven't refactored it. It doesn't actually serialize both versions, it will only do the one that we have chosen and the other returned value will be an empty slice of bytes. I think that leads to confusion and this should be refactored but I have chosen to ignore it for now to not increase the scope of this change.
I'd be happy to refactor the function in this or a future PR though if you'd like? FYI from what I can see I don't think the compressed output requires anything from the notCompressed serialization to function, so it should be easy to split into two functions.
broadcaster/backlog/backlog.go
Outdated
b.messageCount.Store(0) | ||
log.Warn(err.Error()) | ||
} else if errors.Is(err, errSequenceNumberSeen) { | ||
log.Info("ignoring message sequence number (%s), already in backlog", msg.SequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check that this message is the same as what we have in the buffer. If not, we need to clear the buffer (or preferably only clear this sequence number and anything after it), log an error saying that a reorg occurred, and then add this message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, the backlogSegment.append
method has an if statement with the following checks:
- Is this message the next number in the sequence? +1 from the previous number
- Else is this message larger than the expected next number in the sequence? If it is return
errDropSegments
error - Else this message must have already been seen and we return the
errSequenceNumberSeen
error
Let me know if you think this approach is missing something :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an edge case here which is reorgs. When a reorg occurs, a message can be replaced by a different message with the same sequence number. However, I'm beginning to think that what we should do in reorg cases is too ambiguous and error prone, and we should just rely on the L1 sequencer inbox to sort it out and establish a canonical message ordering, and then pick up the feed from there. I.e. the current approach should be fine. I'll probably discuss this with the nitro team and make sure this approach makes sense to everyone.
Btw, this log.Info
line needs to use the key-value logging scheme instead of %s.
…ges from the backlog
… to normal pointer to backlogSegment
…pass down the required channel to communicate with each CC
ec8214c
to
7cf1b2b
Compare
…nd recreate the Contains method
…client rather than the whole segment
…seq num is lower than what is in the backlog
…ssageCount, use the backlogSegment.messagesLock to calculate these when required
…er to containers.SyncMap
backlogSegment.Contains functions
…id slice being populated with nils
…nt.Next now returns a nil interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just have a couple last comments, mainly we need to be careful to avoid recursively calling RLock due to the way Go implements mutexes
segment, err := b.Lookup(start) | ||
if start < head.Start() { | ||
// doing this check after the Lookup call ensures there is no race | ||
// condition with a delete call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this quite works because head
was already loaded before in b.head.Start()
. Maybe you could reload head
after the Lookup
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good shout, changed
broadcaster/backlog/backlog.go
Outdated
noMsgs := []*m.BroadcastFeedMessage{} | ||
if start < s.start.Load() { | ||
if start < s.Start() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be careful about recursive read locks. They cause deadlocks:
If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.
https://pkg.go.dev/sync#RWMutex.RLock
I'd suggest having a lowercase start()
function which doesn't lock the mutex and is used by both Start()
and this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I have changed it over
broadcaster/backlog/backlog.go
Outdated
return s.messages[startIndex:endIndex], nil | ||
tmp := make([]*m.BroadcastFeedMessage, len(s.messages)) | ||
copy(tmp, s.messages) | ||
return tmp[startIndex:endIndex], nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt this would matter too much but ideally we'd create tmp to only be the size of endIndex - startIndex and copy that section
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually tried to do that with:
tmp := make([]*m.BroadcastFeedMessage, endIndex-startIndex)
copy(tmp, s.messages[startIndex:endIndex])
return tmp, nil
but for some reason it seemed to copy messages from the s.messages
slice outside of the startIndex
to endIndex
range that I had specified. I'm not sure why and maybe I did something wrong. For now I will leave this as is as it fixed the errors I saw in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I must have tried something else when copying just that section of the slice as I added it and it is passing the tests. I think earlier I might have used a different length than endIndex-startIndex
when making the tmp slice
broadcaster/backlog/backlog.go
Outdated
s.messagesLock.RLock() | ||
defer s.messagesLock.RUnlock() | ||
start := s.Start() | ||
if i < start || i > s.End() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing in this function about recursive locking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I have changed it over
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's some calls to End that have the same issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@PlasmaPower requested that the wsbroadcastserver library be refactored.
Currently the
ClientManager
will use a single goroutine to register eachClientConnection
. Part of that process is sending the entire backlog, which could be very large. This means that whilst theClientManager
is blocked registering aClientConnection
and sending the entire backlog, other clients cannot connect.The goal of the refactor is to move the logic that sends the backlog from the
ClientManager
goroutine to eachClientConnection
goroutine. We must ensure that no messages are skipped when switching from the backlog to theout
channel in eachClientConnection
. We can send messages twice and the client should be able to handle it.